package com.offcn.bigdata.streaming.p2

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming中的操作算子学习
  *     transform
  *         就转换算子，transform(rdd => rdd)，遍历该dstream中的每个rdd执行相应的业务逻辑，结果集仍为rdd，那么这些新的rdd又构成了新的dstream
  *         所以我们前面学习的stream中的算子，比如map其实就是
  *         dstream.map(rdd => rdd.map(xxx))
  *         dstream.flatMap(rdd => rdd.flatMap(xxx))
  *
  *         比较常见的使用场景：
  *             dstream处理的是流式数据集，但是一些字典数据集通常都是离线数据集，所以不可避免的会涉及到流式的表和离线的表的关联操作。
  *                 比如，地区字典表信息，再比如用户的点击流数据和黑名单
  *                 问题是关联操作---join，但是很遗憾，dstream的join只能是dstream.join(otherDStream)
  *                 现在要做的是dstream.join(rdd)
  *
  *                 rdd的关联只能和rdd进行关联，所以如果能从dstream中找到rdd，就可以和被关联的rdd执行关联操作，那么请问怎么找到dstream中的rdd？
  *                 dstream.transform(rdd => rdd.join(otherRDD))
  *
  *             dstream的分区数量太多了，我们想减少分区数，怎么做，需要说明的是，stream中提供了repartition，其实就是rdd的repartition，
  *                 是用来增大分区的，所以没有减少分区的操作
  *                 dstream.transform(_.coalesce(numPartitions, shuffle = true))
  *
  *         dstream的主要作用就是来给开发人员提供了一个入口，用来实现系统没能提供的相关api。
  *
  *     updateStateByKey
  *     window
  *     foreachRDD
  */
object _03StreamingOperatorTranformOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_02Streaming2KafkaOps")
            .setMaster("local[*]")
        //            .set("spark.streaming.kafka.maxRatePerPartition", "10")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)

        val blacklistRDD = ssc.sparkContext.parallelize(List(
            Blacklist("27.19.74.143", 1603853908733L, true),//就是黑名单数据
            Blacklist("110.52.250.126", 1603853908733L, false)//准备要从黑名单中移除，正在观察期
        )).map(backlist => (backlist.ip, backlist))


        val lines = ssc.socketTextStream("bigdata01", 9999)
        //27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127
        val joinedDStream: DStream[(String, String)] = lines.transform(rdd => {
            val ip2Info:RDD[(String, String)] = rdd.map(line => {
                val index = line.indexOf("##")
                val ip = line.substring(0, index)
                val info = line.substring(index + 2)
                (ip, info)
            })
            //关联离线数据集---黑名单
            val joinedRDD:RDD[(String, (String, Option[Blacklist]))] = ip2Info.leftOuterJoin(blacklistRDD)
            //过滤掉黑名单中的数据，保留正常数据集
            val filetered = joinedRDD.filter{case (ip, (info, blacklistOption)) => blacklistOption.isEmpty || !blacklistOption.get.usable}

            filetered.map{case (ip, (info, blacklistOption)) => (ip, info)}
        })
        joinedDStream.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                rdd.foreach(println)
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

case class Blacklist(ip: String, createTime: Long, usable: Boolean)